Conversation
- Propagate trigger-time tags through SFN and Argo workflow executions via metaflow.trigger_tags parameter and METAFLOW_TRIGGER_TAGS env var - Tags set at trigger time are available to all steps in the workflow - Resume operations preserve and propagate the original run's tags Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Tests cover METAFLOW_TRIGGER_TAGS env var parsing, Argo/SFN trigger CLI --tag option, SFN execution input format, and resume tag merge logic.
ef75451 to
04dcd2b
Compare
Greptile SummaryThis PR propagates trigger-time tags through SFN and Argo workflow executions via Confidence Score: 3/5Not safe to merge for flows using @schedule with SFN — EventBridge-triggered executions will fail at the start step after redeployment. The P1 finding (missing TriggerTags in EventBridge input) is a definite current breakage on the changed path for any SFN flow with @schedule. All other findings are P2. The Argo path is safe. metaflow/plugins/aws/step_functions/event_bridge_client.py — must be updated to include TriggerTags in the EventBridge execution input. Important Files Changed
Reviews (1): Last reviewed commit: "cleanup: remove duplicate import, unused..." | Re-trigger Greptile |
| # Propagate trigger-time tags from execution input to all steps. | ||
| # The trigger command always includes TriggerTags in the input. | ||
| attrs["metaflow.trigger_tags.$"] = "$.TriggerTags" | ||
| env["METAFLOW_TRIGGER_TAGS"] = "$.TriggerTags" |
There was a problem hiding this comment.
EventBridge-scheduled executions will fail after redeployment
After redeploying a state machine with these changes, any execution that doesn't include TriggerTags in the input will throw a States.Runtime error when Step Functions tries to resolve $.TriggerTags. This includes all EventBridge-scheduled runs: event_bridge_client.py (line 57) passes {"Parameters": json.dumps({})} as the execution input — no TriggerTags key — so the JsonPath lookup fails immediately.
The Argo path avoids this by setting .value("[]") as a default on the workflow template parameter. SFN needs the same defensive default, which means updating EventBridgeClient._set() to include "TriggerTags": json.dumps([]) in its Input:
"Input": json.dumps({"Parameters": json.dumps({}), "TriggerTags": json.dumps([])}),| try: | ||
| from ..client.core import Run | ||
|
|
||
| origin_run = Run("%s/%s" % (flow_name, origin_run_id), _namespace_check=False) | ||
| return list(origin_run.user_tags) | ||
| except Exception: | ||
| # If we can't read the origin run's tags (e.g. metadata service |
There was a problem hiding this comment.
Overly broad exception suppression
The bare except Exception swallows everything — including AttributeError, ImportError, or other programming errors that signal a real bug. Consider catching only the expected failure modes (e.g. MetaflowNotFound, network errors) and letting unexpected exceptions surface, or at minimum logging a warning so callers know tags were silently dropped.
| try: | |
| from ..client.core import Run | |
| origin_run = Run("%s/%s" % (flow_name, origin_run_id), _namespace_check=False) | |
| return list(origin_run.user_tags) | |
| except Exception: | |
| # If we can't read the origin run's tags (e.g. metadata service | |
| except Exception as e: | |
| # If we can't read the origin run's tags (e.g. metadata service | |
| # unavailable), we proceed without propagating tags. | |
| import logging | |
| logging.getLogger(__name__).debug( | |
| "Could not retrieve tags for origin run %s/%s: %s", | |
| flow_name, | |
| origin_run_id, | |
| e, | |
| ) | |
| return [] |
| # Dump parameters into `Parameters` input field. | ||
| input = json.dumps({"Parameters": json.dumps(parameters)}) | ||
| # Always include TriggerTags (defaulting to empty list) in the | ||
| # execution input. The state machine propagates this field through | ||
| # every step so that trigger-time tags are applied to all tasks. | ||
| input = json.dumps( | ||
| { | ||
| "Parameters": json.dumps(parameters), | ||
| "TriggerTags": json.dumps(tags if tags else []), |
There was a problem hiding this comment.
TriggerTags double-JSON-encodes the list
tags if tags else [] is a list, and json.dumps(...) of that list gives a string like '["t1","t2"]'. That string is then embedded in the outer json.dumps(...), so what Step Functions receives for $.TriggerTags is a JSON-encoded string, not a list. When a step reads METAFLOW_TRIGGER_TAGS it gets '["t1","t2"]' and must call json.loads to recover the list — which the step-cmd code does, so it works. However this is an unusual convention worth a comment, and it must stay consistent with the Argo side where the parameter value is also json.dumps(tags). A brief comment here would clarify the intentional double-encoding.
Summary
metaflow.trigger_tagsparameter andMETAFLOW_TRIGGER_TAGSenv var.before[)Test plan
test/unit/test_tag_improvements.py— unit tests for trigger-time tag propagation and resume tag preservation🤖 Generated with Claude Code